Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0e18ca7
Use pubsub instead of timeout.
elibol May 16, 2018
fc2572c
Correct status message.
elibol May 17, 2018
2e8af60
eric's feedback!
elibol May 17, 2018
b57a548
Changes from Stephanie's review.
elibol May 17, 2018
a128698
object directory changes for ray.wait.
elibol May 18, 2018
fa5c32d
Merge branch 'master' into om_pubsub
elibol May 18, 2018
0ccf46b
Merge branch 'master' into om_pubsub
elibol May 18, 2018
b02de4f
Merge branch 'om_pubsub' into om_wait
elibol May 18, 2018
f9a9e16
wait without testing or timeout=0.
elibol May 18, 2018
15b7f61
Handle remaining cases for wait.
elibol May 18, 2018
a22263b
linting
elibol May 18, 2018
8ab41f0
added tests of om wait imp.
elibol May 18, 2018
98bacfa
add local test.
elibol May 18, 2018
d518a89
Merge branch 'master' into om_wait
elibol May 21, 2018
53f33e0
plasma imp.
elibol May 24, 2018
8ef35f7
block worker as with pull.
elibol May 29, 2018
6e10f9e
local scheduler implementation of wait.
elibol May 30, 2018
9a95c65
with passing tests.
elibol May 30, 2018
aa12bd7
minor adjustments.
elibol May 30, 2018
9e1602d
Merge branch 'master' into om_wait_local_scheduler
elibol May 30, 2018
304b39c
handle return statuses.
elibol May 30, 2018
5d63bb3
enable more tests.
elibol May 30, 2018
cf1fdb2
add test for existing num_returns semantics, and maintain existing nu…
elibol May 31, 2018
531d024
move error handling to both code paths.
elibol May 31, 2018
d0d3ea4
implementing another round of feedback.
elibol May 31, 2018
62ae832
Comment on OM tests.
elibol May 31, 2018
67eef67
remove check for length zero list.
elibol Jun 1, 2018
0796a17
remove elapsed.
elibol Jun 1, 2018
dd9f0db
Preserve input/output order.
elibol Jun 1, 2018
9d4ed2b
debias local objects.
elibol Jun 1, 2018
541b88c
Merge branch 'master' into om_wait_local_scheduler
elibol Jun 1, 2018
58af739
use common helper function in object directory.
elibol Jun 1, 2018
d9ef29b
updated documentation
elibol Jun 1, 2018
fa1928b
linting.
elibol Jun 1, 2018
d41b1d0
handle return status.
elibol Jun 1, 2018
aeaab5b
simplify order preservation test + fix valgrind test error.
elibol Jun 1, 2018
048f45f
update name of final Lookup callback.
elibol Jun 2, 2018
0aa7525
Merge branch 'master' into om_wait_local_scheduler
elibol Jun 2, 2018
833939f
linting
elibol Jun 2, 2018
8e1947c
c++ style casting.
elibol Jun 2, 2018
83d04dd
linting.
elibol Jun 2, 2018
080282f
linting.
elibol Jun 2, 2018
a58f5c9
incorporate second round of feedback.
elibol Jun 5, 2018
c6d8ba5
correct python tests.
elibol Jun 5, 2018
7d8d756
test comments.
elibol Jun 5, 2018
6b6e2f3
incorporate reviews.
elibol Jun 6, 2018
3a86c93
Fixes with regression tests.
elibol Jun 6, 2018
1a99f25
update documentation.
elibol Jun 6, 2018
00eafd7
reference to avoid copy.
elibol Jun 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/local_scheduler/local_scheduler_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,13 @@ std::pair<std::vector<ObjectID>, std::vector<ObjectID>> local_scheduler_wait(
LocalSchedulerConnection *conn,
const std::vector<ObjectID> &object_ids,
int num_returns,
int64_t timeout,
int64_t timeout_milliseconds,
bool wait_local) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does wait_local work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No; I just added the piping for the argument. The back-end returns an unimplemented status if it's set to true. I have an idea of how to implement this whenever we'd like to add it.

// Write request.
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateWaitRequest(
fbb, to_flatbuf(fbb, object_ids), num_returns, timeout, wait_local);
fbb, to_flatbuf(fbb, object_ids), num_returns, timeout_milliseconds,
wait_local);
fbb.Finish(message);
write_message(conn->conn, ray::protocol::MessageType_WaitRequest,
fbb.GetSize(), fbb.GetBufferPointer());
Expand All @@ -206,6 +207,7 @@ std::pair<std::vector<ObjectID>, std::vector<ObjectID>> local_scheduler_wait(
int64_t reply_size;
uint8_t *reply;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a memory leak, right? reply needs to get freed somewhere (there is a malloc in read_message).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added free at the end.

read_message(conn->conn, &type, &reply_size, &reply);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add

RAY_CHECK(type == MessageType_WaitReply);

RAY_CHECK(type == ray::protocol::MessageType_WaitReply);
auto reply_message = flatbuffers::GetRoot<ray::protocol::WaitReply>(reply);
// Convert result.
std::pair<std::vector<ObjectID>, std::vector<ObjectID>> result;
Expand All @@ -219,5 +221,7 @@ std::pair<std::vector<ObjectID>, std::vector<ObjectID>> local_scheduler_wait(
ObjectID object_id = ObjectID::from_binary(remaining->Get(i)->str());
result.second.push_back(object_id);
}
/* Free the original message from the local scheduler. */
free(reply);
return result;
}
5 changes: 3 additions & 2 deletions src/local_scheduler/local_scheduler_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,16 @@ void local_scheduler_set_actor_frontier(LocalSchedulerConnection *conn,
/// \param conn The connection information.
/// \param object_ids The objects to wait for.
/// \param num_returns The number of objects to wait for.
/// \param timeout The duration to wait before returning.
/// \param timeout_milliseconds Duration, in milliseconds, to wait before
/// returning.
/// \param wait_local Whether to wait for objects to appear on this node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument doesn't seem to match the current semantics for ray.wait. Is there a use case for this API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_local is something @robertnishihara said we were planning to add at some point, so I included the piping for it.

/// \return A pair with the first element containing the object ids that were
/// found, and the second element the objects that were not found.
std::pair<std::vector<ObjectID>, std::vector<ObjectID>> local_scheduler_wait(
LocalSchedulerConnection *conn,
const std::vector<ObjectID> &object_ids,
int num_returns,
int64_t timeout,
int64_t timeout_milliseconds,
bool wait_local);

#endif
12 changes: 7 additions & 5 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,17 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
status = gcs_client_->object_table().RequestNotifications(
JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
}
if (listeners_[object_id].callbacks.count(callback_id) > 0) {
auto &listener_state = listeners_.find(object_id)->second;
// TODO(hme): Make this fatal after implementing Pull suppression.
if (listener_state.callbacks.count(callback_id) > 0) {
return ray::Status::OK();
}
listeners_[object_id].callbacks.emplace(callback_id, callback);
listener_state.callbacks.emplace(callback_id, callback);
// Immediately notify of found object locations.
if (!listeners_[object_id].current_object_locations.empty()) {
if (!listener_state.current_object_locations.empty()) {
std::vector<ClientID> client_id_vec(
listeners_[object_id].current_object_locations.begin(),
listeners_[object_id].current_object_locations.end());
listener_state.current_object_locations.begin(),
listener_state.current_object_locations.end());
callback(client_id_vec, object_id);
}
return status;
Expand Down
100 changes: 58 additions & 42 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,51 +355,56 @@ ray::Status ObjectManager::Cancel(const ObjectID &object_id) {
return status;
}

ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids, int64_t wait_ms,
uint64_t num_required_objects, bool wait_local,
const WaitCallback &callback) {
ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms, uint64_t num_required_objects,
bool wait_local, const WaitCallback &callback) {
UniqueID wait_id = UniqueID::from_random();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've had trouble with from_random being too expensive in the past.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it was a problem for Pull, when we were polling for locations with a low timeout. It's called once for each call to Wait, so I think it is okay. Is there an alternative to from_random? How are unique ObjectIDs created?


if (wait_local) {
return ray::Status::NotImplemented("Wait for local objects is not yet implemented.");
}

RAY_CHECK(wait_ms >= 0);
RAY_CHECK(timeout_ms >= 0 || timeout_ms == -1);
RAY_CHECK(num_required_objects != 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we could return [], object_ids immediately.

RAY_CHECK(num_required_objects <= object_ids.size());
if (object_ids.size() == 0) {
callback(std::vector<ObjectID>(), std::vector<ObjectID>());
}

// Initialize fields.
active_wait_requests_.emplace(wait_id, WaitState(*main_service_, wait_ms, callback));
active_wait_requests_.emplace(wait_id, WaitState(*main_service_, timeout_ms, callback));
auto &wait_state = active_wait_requests_.find(wait_id)->second;
wait_state.object_id_order = object_ids;
wait_state.wait_ms = wait_ms;
wait_state.timeout_ms = timeout_ms;
wait_state.num_required_objects = num_required_objects;
for (auto &oid : object_ids) {
if (local_objects_.count(oid) > 0) {
wait_state.found.insert(oid);
for (auto &object_id : object_ids) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const auto wherever you can.

if (local_objects_.count(object_id) > 0) {
wait_state.found.insert(object_id);
} else {
wait_state.remaining.insert(oid);
wait_state.remaining.insert(object_id);
}
}

if (wait_state.remaining.empty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going to include this special case here, then the condition should probably be something like wait_state.found.size() >= num_required_objects || wait_ms == 0.

That said, I think it makes sense to handle this case after firing off the LookupLocations calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We invoke the lookup calls immediately after checking which objects are local to obtain current information about the location of remote objects. This allows us to collect information about all given objects, regardless of their location.

If we invoke when wait_state.found.size() >= num_required_objects || wait_ms == 0 we may invoke WaitComplete before checking which remote objects are ready; we'll bias locally available objects.

If we fire LookupLocations and check wait_state.found.size() >= num_required_objects || wait_ms == 0 independently, and the condition is true, the calls to LookupLocations will do nothing because the callback to Wait will already be invoked by the time any of the handlers for LookupLocations are invoked for the given wait_id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to this if block detailing the reason for this condition? I had the same question as Robert when reading this code, so it'd be good to explain to the reader why you still have to do lookups for the remaining objects.

WaitComplete(wait_id);
} else {
for (auto &oid : wait_state.remaining) {
// We invoke lookup calls immediately after checking which objects are local to
// obtain current information about the location of remote objects. Thus,
// we obtain information about all given objects, regardless of their location.
// This is required to ensure we do not bias returning locally available objects
// as ready whenever Wait is invoked with a mixture of local and remote objects.
for (auto &object_id : wait_state.remaining) {
// Lookup remaining objects.
wait_state.requested_objects.insert(oid);
wait_state.requested_objects.insert(object_id);
RAY_CHECK_OK(object_directory_->LookupLocations(
oid, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &object_id) {
object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &lookup_object_id) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't find not succeed at finding the wait_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way to progress to a point where active_wait_requests_.erase(wait_id) is invoked is within WaitLookupComplete, which is only invoked when requested_objects.empty(), at which point no more handlers will be invoked with the same wait_id.

if (!client_ids.empty()) {
wait_state.remaining.erase(object_id);
wait_state.found.insert(object_id);
wait_state.remaining.erase(lookup_object_id);
wait_state.found.insert(lookup_object_id);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would it mean if client_ids.empty() == true? That would mean that the object has been evicted? How do we want to handle that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may mean that the object has been evicted. It may also mean that no entries exist for the object. In both cases, after the round of lookups, if we need to wait for more objects, SubscribeObjectLocations will be invoked on objects for which client_ids.empty() == true (in AllWaitLookupsComplete). The handler to SubscribeObjectLocations will be invoked when !client_ids.empty().

wait_state.requested_objects.erase(object_id);
wait_state.requested_objects.erase(lookup_object_id);
if (wait_state.requested_objects.empty()) {
AllWaitLookupsComplete(wait_id);
}
Expand All @@ -412,47 +417,58 @@ ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids, int64_t
void ObjectManager::AllWaitLookupsComplete(const UniqueID &wait_id) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
if (wait_state.found.size() >= wait_state.num_required_objects ||
wait_state.wait_ms == 0) {
wait_state.timeout_ms == 0) {
// Requirements already satisfied.
WaitComplete(wait_id);
} else {
for (auto &oid : wait_state.remaining) {
// Subscribe to objects in order to ensure Wait-related tests are deterministic.
for (auto &object_id : wait_state.object_id_order) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const auto wherever you can.

if (wait_state.remaining.count(object_id) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug that I described in the earlier comment is still an issue here. wait_state is a reference to the value at active_wait_requests_. The reference will become invalid if the entry is erased from active_wait_requests_ between iterations of this for loop, so this line can produce undefined behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the relevant check that corrects the bug you described earlier: active_wait_requests_.find(wait_id) == active_wait_requests_.end()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't think it will break in that particular way anymore, but undefined behavior is still possible because of the reference to wait_state. Same underlying issue, but it will break at a different line.

Copy link
Contributor Author

@elibol elibol Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. I've made the fix and added a regression test.

continue;
}
// Subscribe to object notifications.
wait_state.requested_objects.insert(oid);
if (active_wait_requests_.find(wait_id) == active_wait_requests_.end()) {
// This is possible if an object's location is obtained immediately,
// within the current callstack. In this case, WaitComplete has been
// invoked already, so we're done.
return;
}
wait_state.requested_objects.insert(object_id);
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do SubscribeObjectLocations here in addition to the LookupLocations in Wait? Wouldn't it make sense to just do one of those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lookup calls return immediately, regardless of whether there's a known location of the object. After the lookup calls, if the conditions of the wait call are still not satisfied, we start the timer and subscribe to each remaining object. The subscription callback will be invoked only when an object's location is found. We wait until the minimum number of objects is found or the timeout is triggered.

wait_id, oid, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &object_id) {
wait_id, object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &subscribe_object_id) {
auto object_id_wait_state = active_wait_requests_.find(wait_id);
RAY_CHECK(object_id_wait_state != active_wait_requests_.end());
auto &wait_state = active_wait_requests_.find(wait_id)->second;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an issue with the way this code is currently structured because of the fact that the callback registered in SubscribeObjectLocations may now get called directly. It's possible that right now we will not actually see a bug, but that is only because of the specific ordering of calls made on the object directory by the object manager, and that seems quite brittle. Here is a scenario where I think the code would break:

  1. SubscribeObjectLocations gets called on objects A and B (e.g., for a Pull, or for a different Wait). Locations for both are cached in the object directory.
  2. Wait is called on objects A and B, with 1 object required.
  3. In the same call stack, SubscribeObjectLocations is called on object A. The cached locations are found, this callback fires, and the wait request completes and is erased from active_wait_requests_.
  4. Again, in the same call stack, SubscribeObjectLocations is called on object B. active_wait_requests_.find(wait_id) will fail (silently).

if (wait_state.remaining.count(object_id) != 0) {
wait_state.remaining.erase(object_id);
wait_state.found.insert(object_id);
}
wait_state.requested_objects.erase(object_id);
RAY_CHECK_OK(
object_directory_->UnsubscribeObjectLocations(wait_id, object_id));
RAY_CHECK(wait_state.remaining.erase(subscribe_object_id));
wait_state.found.insert(subscribe_object_id);
wait_state.requested_objects.erase(subscribe_object_id);
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(
wait_id, subscribe_object_id));
if (wait_state.found.size() >= wait_state.num_required_objects) {
WaitComplete(wait_id);
}
}));
}
// Set timeout.
// TODO (hme): If we need to just wait for all objects independent of time
// (i.e. infinite wait time), determine what the value of wait_ms should be and
// skip this call. WaitComplete will be invoked when all objects have locations.
wait_state.timeout_timer->async_wait(
[this, wait_id](const boost::system::error_code &error_code) {
if (error_code.value() != 0) {
return;
}
WaitComplete(wait_id);
});
if (wait_state.timeout_ms != -1) {
wait_state.timeout_timer->async_wait(
[this, wait_id](const boost::system::error_code &error_code) {
if (error_code.value() != 0) {
return;
}
WaitComplete(wait_id);
});
}
}
}

void ObjectManager::WaitComplete(const UniqueID &wait_id) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
// If we complete with outstanding requests, then wait_ms should be non-zero.
RAY_CHECK(!(wait_state.requested_objects.size() > 0) || wait_state.wait_ms > 0);
// If we complete with outstanding requests, then timeout_ms should be non-zero or -1
// (infinite wait time).
if (!wait_state.requested_objects.empty()) {
RAY_CHECK(wait_state.timeout_ms > 0 || wait_state.timeout_ms == -1);
}
// Unsubscribe to any objects that weren't found in the time allotted.
for (auto &object_id : wait_state.requested_objects) {
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(wait_id, object_id));
Expand Down
17 changes: 11 additions & 6 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ class ObjectManager : public ObjectManagerInterface {
/// \param callback Invoked when either timeout_ms is satisfied OR num_ready_objects
/// is satisfied.
/// \return Status of whether the wait successfully initiated.
ray::Status Wait(const std::vector<ObjectID> &object_ids, int64_t wait_ms,
ray::Status Wait(const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
uint64_t num_required_objects, bool wait_local,
const WaitCallback &callback);

private:
friend class TestObjectManager;

ClientID client_id_;
const ObjectManagerConfig config_;
std::unique_ptr<ObjectDirectoryInterface> object_directory_;
Expand Down Expand Up @@ -197,17 +199,20 @@ class ObjectManager : public ObjectManagerInterface {
/// Cache of locally available objects.
std::unordered_map<ObjectID, ObjectInfoT> local_objects_;

/// This is used as the callback identifier in Pull for
/// SubscribeObjectLocations. We only need one identifier because we never need to
/// subscribe multiple times to the same object during Pull.
UniqueID object_directory_pull_callback_id_ = UniqueID::from_random();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_random is expensive

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see this is only called once

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this field needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to distinguish object directory Subscribe handlers used for Pull vs. Wait, so that Unsubscribe can unambiguously remove the correct handler whenever Pull and Wait subscribe to the same object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document this field.


struct WaitState {
WaitState(asio::io_service &service, int64_t wait_ms, const WaitCallback &callback)
: wait_ms(wait_ms),
WaitState(asio::io_service &service, int64_t timeout_ms, const WaitCallback &callback)
: timeout_ms(timeout_ms),
timeout_timer(std::unique_ptr<boost::asio::deadline_timer>(
new boost::asio::deadline_timer(service,
boost::posix_time::milliseconds(wait_ms)))),
new boost::asio::deadline_timer(
service, boost::posix_time::milliseconds(timeout_ms)))),
callback(callback) {}
/// The period of time to wait before invoking the callback.
int64_t wait_ms;
int64_t timeout_ms;
/// The timer used whenever wait_ms > 0.
std::unique_ptr<boost::asio::deadline_timer> timeout_timer;
/// The callback invoked when WaitCallback is complete.
Expand Down
Loading