Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0e18ca7
Use pubsub instead of timeout.
elibol May 16, 2018
fc2572c
Correct status message.
elibol May 17, 2018
2e8af60
eric's feedback!
elibol May 17, 2018
b57a548
Changes from Stephanie's review.
elibol May 17, 2018
a128698
object directory changes for ray.wait.
elibol May 18, 2018
fa5c32d
Merge branch 'master' into om_pubsub
elibol May 18, 2018
0ccf46b
Merge branch 'master' into om_pubsub
elibol May 18, 2018
b02de4f
Merge branch 'om_pubsub' into om_wait
elibol May 18, 2018
f9a9e16
wait without testing or timeout=0.
elibol May 18, 2018
15b7f61
Handle remaining cases for wait.
elibol May 18, 2018
a22263b
linting
elibol May 18, 2018
8ab41f0
added tests of om wait imp.
elibol May 18, 2018
98bacfa
add local test.
elibol May 18, 2018
d518a89
Merge branch 'master' into om_wait
elibol May 21, 2018
53f33e0
plasma imp.
elibol May 24, 2018
8ef35f7
block worker as with pull.
elibol May 29, 2018
6e10f9e
local scheduler implementation of wait.
elibol May 30, 2018
9a95c65
with passing tests.
elibol May 30, 2018
aa12bd7
minor adjustments.
elibol May 30, 2018
9e1602d
Merge branch 'master' into om_wait_local_scheduler
elibol May 30, 2018
304b39c
handle return statuses.
elibol May 30, 2018
5d63bb3
enable more tests.
elibol May 30, 2018
cf1fdb2
add test for existing num_returns semantics, and maintain existing nu…
elibol May 31, 2018
531d024
move error handling to both code paths.
elibol May 31, 2018
d0d3ea4
implementing another round of feedback.
elibol May 31, 2018
62ae832
Comment on OM tests.
elibol May 31, 2018
67eef67
remove check for length zero list.
elibol Jun 1, 2018
0796a17
remove elapsed.
elibol Jun 1, 2018
dd9f0db
Preserve input/output order.
elibol Jun 1, 2018
9d4ed2b
debias local objects.
elibol Jun 1, 2018
541b88c
Merge branch 'master' into om_wait_local_scheduler
elibol Jun 1, 2018
58af739
use common helper function in object directory.
elibol Jun 1, 2018
d9ef29b
updated documentation
elibol Jun 1, 2018
fa1928b
linting.
elibol Jun 1, 2018
d41b1d0
handle return status.
elibol Jun 1, 2018
aeaab5b
simplify order preservation test + fix valgrind test error.
elibol Jun 1, 2018
048f45f
update name of final Lookup callback.
elibol Jun 2, 2018
0aa7525
Merge branch 'master' into om_wait_local_scheduler
elibol Jun 2, 2018
833939f
linting
elibol Jun 2, 2018
8e1947c
c++ style casting.
elibol Jun 2, 2018
83d04dd
linting.
elibol Jun 2, 2018
080282f
linting.
elibol Jun 2, 2018
a58f5c9
incorporate second round of feedback.
elibol Jun 5, 2018
c6d8ba5
correct python tests.
elibol Jun 5, 2018
7d8d756
test comments.
elibol Jun 5, 2018
6b6e2f3
incorporate reviews.
elibol Jun 6, 2018
3a86c93
Fixes with regression tests.
elibol Jun 6, 2018
1a99f25
update documentation.
elibol Jun 6, 2018
00eafd7
reference to avoid copy.
elibol Jun 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,19 @@ ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms, uint64_t num_required_objects,
bool wait_local, const WaitCallback &callback) {
UniqueID wait_id = UniqueID::from_random();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've had trouble with from_random being too expensive in the past.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it was a problem for Pull, when we were polling for locations with a low timeout. It's called once for each call to Wait, so I think it is okay. Is there an alternative to from_random? How are unique ObjectIDs created?

RAY_RETURN_NOT_OK(AddWaitRequest(wait_id, object_ids, timeout_ms, num_required_objects,
wait_local, callback));
RAY_RETURN_NOT_OK(LookupRemainingWaitObjects(wait_id));
// LookupRemainingWaitObjects invokes SubscribeRemainingWaitObjects once lookup has
// been performed on all remaining objects.
return ray::Status::OK();
}

ray::Status ObjectManager::AddWaitRequest(const UniqueID &wait_id,
const std::vector<ObjectID> &object_ids,
int64_t timeout_ms,
uint64_t num_required_objects, bool wait_local,
const WaitCallback &callback) {
if (wait_local) {
return ray::Status::NotImplemented("Wait for local objects is not yet implemented.");
}
Expand All @@ -385,6 +397,12 @@ ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
}
}

return ray::Status::OK();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method always returns OK, I would make it void.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually also returns ray::Status::NotImplemented currently if wait_local=true.

}

ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;

if (wait_state.remaining.empty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going to include this special case here, then the condition should probably be something like wait_state.found.size() >= num_required_objects || wait_ms == 0.

That said, I think it makes sense to handle this case after firing off the LookupLocations calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We invoke the lookup calls immediately after checking which objects are local to obtain current information about the location of remote objects. This allows us to collect information about all given objects, regardless of their location.

If we invoke when wait_state.found.size() >= num_required_objects || wait_ms == 0 we may invoke WaitComplete before checking which remote objects are ready; we'll bias locally available objects.

If we fire LookupLocations and check wait_state.found.size() >= num_required_objects || wait_ms == 0 independently, and the condition is true, the calls to LookupLocations will do nothing because the callback to Wait will already be invoked by the time any of the handlers for LookupLocations are invoked for the given wait_id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to this if block detailing the reason for this condition? I had the same question as Robert when reading this code, so it'd be good to explain to the reader why you still have to do lookups for the remaining objects.

WaitComplete(wait_id);
} else {
Expand All @@ -396,7 +414,7 @@ ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
for (const auto &object_id : wait_state.remaining) {
// Lookup remaining objects.
wait_state.requested_objects.insert(object_id);
RAY_CHECK_OK(object_directory_->LookupLocations(
RAY_RETURN_NOT_OK(object_directory_->LookupLocations(
object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &lookup_object_id) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't find not succeed at finding the wait_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way to progress to a point where active_wait_requests_.erase(wait_id) is invoked is within WaitLookupComplete, which is only invoked when requested_objects.empty(), at which point no more handlers will be invoked with the same wait_id.

Expand All @@ -406,40 +424,47 @@ ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would it mean if client_ids.empty() == true? That would mean that the object has been evicted? How do we want to handle that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may mean that the object has been evicted. It may also mean that no entries exist for the object. In both cases, after the round of lookups, if we need to wait for more objects, SubscribeObjectLocations will be invoked on objects for which client_ids.empty() == true (in AllWaitLookupsComplete). The handler to SubscribeObjectLocations will be invoked when !client_ids.empty().

wait_state.requested_objects.erase(lookup_object_id);
if (wait_state.requested_objects.empty()) {
AllWaitLookupsComplete(wait_id);
SubscribeRemainingWaitObjects(wait_id);
}
}));
}
}
return ray::Status::OK();
}

void ObjectManager::AllWaitLookupsComplete(const UniqueID &wait_id) {
void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
if (wait_state.found.size() >= wait_state.num_required_objects ||
wait_state.timeout_ms == 0) {
// Requirements already satisfied.
WaitComplete(wait_id);
} else {
// Subscribe to objects in order to ensure Wait-related tests are deterministic.
// Wait may complete during the execution of any one of the following calls to
// SubscribeObjectLocations, so copy the object ids that need to be iterated over.
// Order matters for test purposes.
std::vector<ObjectID> ordered_remaining_object_ids;
for (const auto &object_id : wait_state.object_id_order) {
if (wait_state.remaining.count(object_id) == 0) {
continue;
if (wait_state.remaining.count(object_id) > 0) {
ordered_remaining_object_ids.push_back(object_id);
}
// Subscribe to object notifications.
}
for (const auto &object_id : ordered_remaining_object_ids) {
if (active_wait_requests_.find(wait_id) == active_wait_requests_.end()) {
// This is possible if an object's location is obtained immediately,
// within the current callstack. In this case, WaitComplete has been
// invoked already, so we're done.
return;
}
wait_state.requested_objects.insert(object_id);
// Subscribe to object notifications.
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do SubscribeObjectLocations here in addition to the LookupLocations in Wait? Wouldn't it make sense to just do one of those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lookup calls return immediately, regardless of whether there's a known location of the object. After the lookup calls, if the conditions of the wait call are still not satisfied, we start the timer and subscribe to each remaining object. The subscription callback will be invoked only when an object's location is found. We wait until the minimum number of objects is found or the timeout is triggered.

wait_id, object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &subscribe_object_id) {
auto object_id_wait_state = active_wait_requests_.find(wait_id);
// We never expect to handle a subscription notification for a wait that has
// already completed.
RAY_CHECK(object_id_wait_state != active_wait_requests_.end());
auto &wait_state = active_wait_requests_.find(wait_id)->second;
auto &wait_state = object_id_wait_state->second;
RAY_CHECK(wait_state.remaining.erase(subscribe_object_id));
wait_state.found.insert(subscribe_object_id);
wait_state.requested_objects.erase(subscribe_object_id);
Expand Down
13 changes: 12 additions & 1 deletion src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,20 @@ class ObjectManager : public ObjectManagerInterface {

/// A set of active wait requests.
std::unordered_map<UniqueID, WaitState> active_wait_requests_;

/// Creates a wait request and adds it to active_wait_requests_.
ray::Status AddWaitRequest(const UniqueID &wait_id,
const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
uint64_t num_required_objects, bool wait_local,
const WaitCallback &callback);

/// Lookup any remaining objects that are not local. This is invoked after
/// the wait request is created and local objects are identified.
ray::Status LookupRemainingWaitObjects(const UniqueID &wait_id);

/// Invoked when lookup for remaining objects has been invoked. This method subscribes
/// to any remaining objects if wait conditions have not yet been satisfied.
void AllWaitLookupsComplete(const UniqueID &wait_id);
void SubscribeRemainingWaitObjects(const UniqueID &wait_id);
/// Completion handler for Wait.
void WaitComplete(const UniqueID &wait_id);

Expand Down
41 changes: 22 additions & 19 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,16 @@ class MockServer {
}

friend class TestObjectManager;
friend class TestObjectManagerCommands;

boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
ObjectManager object_manager_;
};

class TestObjectManager : public ::testing::Test {
class TestObjectManagerBase : public ::testing::Test {
public:
TestObjectManager() {}
TestObjectManagerBase() {}

std::string StartStore(const std::string &id) {
std::string store_id = "/tmp/store";
Expand Down Expand Up @@ -176,10 +175,6 @@ class TestObjectManager : public ::testing::Test {

void object_added_handler_2(ObjectID object_id) { v2.push_back(object_id); };

ObjectDirectoryInterface &get_object_directory(const MockServer &server) {
return *server.object_manager_.object_directory_;
}

protected:
std::thread p;
boost::asio::io_service main_service;
Expand All @@ -199,7 +194,7 @@ class TestObjectManager : public ::testing::Test {
uint push_timeout_ms;
};

class TestObjectManagerCommands : public TestObjectManager {
class TestObjectManager : public TestObjectManagerBase {
public:
int current_wait_test = -1;
int num_connected_clients = 0;
Expand Down Expand Up @@ -273,35 +268,38 @@ class TestObjectManagerCommands : public TestObjectManager {
uint num_expected_objects1 = 1;
uint num_expected_objects2 = 2;
if (v1.size() == num_expected_objects1 && v2.size() == num_expected_objects2) {
TestWaitCallbacks1();
SubscribeObjectThenWait();
}
}

void TestWaitCallbacks1() {
void SubscribeObjectThenWait() {
int data_size = 100;
// Test to ensure Wait works properly during an active subscription to the same
// object.
ObjectID object_1 = WriteDataToClient(client2, data_size);
ObjectID object_2 = WriteDataToClient(client2, data_size);
UniqueID sub_id = ray::ObjectID::from_random();

RAY_CHECK_OK(get_object_directory(*server1).SubscribeObjectLocations(
RAY_CHECK_OK(server1->object_manager_.object_directory_->SubscribeObjectLocations(
sub_id, object_1,
[this, sub_id, object_1, object_2](const std::vector<ray::ClientID> &,
const ray::ObjectID &object_id) {
TestWaitCallbacks2(sub_id, object_1, object_2);
TestWaitWhileSubscribed(sub_id, object_1, object_2);
}));
}

void TestWaitCallbacks2(UniqueID sub_id, ObjectID object_1, ObjectID object_2) {
void TestWaitWhileSubscribed(UniqueID sub_id, ObjectID object_1, ObjectID object_2) {
int num_objects = 2;
int required_objects = 1;
int timeout_ms = 1000;

std::vector<ObjectID> object_ids = {object_1, object_2};
boost::posix_time::ptime start_time = boost::posix_time::second_clock::local_time();
RAY_CHECK_OK(server1->object_manager_.Wait(
object_ids, timeout_ms, required_objects, false,

UniqueID wait_id = UniqueID::from_random();

RAY_CHECK_OK(server1->object_manager_.AddWaitRequest(
wait_id, object_ids, timeout_ms, required_objects, false,
[this, sub_id, object_1, object_ids, num_objects, start_time](
const std::vector<ray::ObjectID> &found,
const std::vector<ray::ObjectID> &remaining) {
Expand All @@ -313,10 +311,14 @@ class TestObjectManagerCommands : public TestObjectManager {
RAY_CHECK(found.size() == 1);
// There's nothing more to test. A check will fail if unexpected behavior is
// triggered.
RAY_CHECK_OK(get_object_directory(*server1).UnsubscribeObjectLocations(
sub_id, object_1));
RAY_CHECK_OK(
server1->object_manager_.object_directory_->UnsubscribeObjectLocations(
sub_id, object_1));
NextWaitTest();
}));

// Skip lookups and rely on Subscribe only to test subscribe interaction.
server1->object_manager_.SubscribeRemainingWaitObjects(wait_id);
}

void NextWaitTest() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate each of the test cases into a separate unit test, and also use the gtest framework to be consistent with other raylet tests? Also, please document what case each of these is supposed to be testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, excellent point. I have a branch I'd like to merge before separating each test into a separate unit test as this is a somewhat substantial change.

Agree and will do the rest.

Expand Down Expand Up @@ -400,7 +402,8 @@ class TestObjectManagerCommands : public TestObjectManager {

switch (current_wait_test) {
case 0: {
// Ensure timeout_ms = 0 returns expected number of found / remaining objects.
// Ensure timeout_ms = 0 returns expected number of found and remaining
// objects.
ASSERT_TRUE(found.size() <= required_objects);
ASSERT_TRUE(static_cast<int>(found.size() + remaining.size()) == num_objects);
NextWaitTest();
Expand Down Expand Up @@ -454,7 +457,7 @@ class TestObjectManagerCommands : public TestObjectManager {
}
};

TEST_F(TestObjectManagerCommands, StartTestObjectManagerCommands) {
TEST_F(TestObjectManager, StartTestObjectManager) {
auto AsyncStartTests = main_service.wrap([this]() { WaitConnections(); });
AsyncStartTests();
main_service.run();
Expand Down